{
  "cells": [
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "fFjof1NgAJwu"
      },
      "outputs": [],
      "source": [
        "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
        "\n",
        "# Licensed to the Apache Software Foundation (ASF) under one\n",
        "# or more contributor license agreements. See the NOTICE file\n",
        "# distributed with this work for additional information\n",
        "# regarding copyright ownership. The ASF licenses this file\n",
        "# to you under the Apache License, Version 2.0 (the\n",
        "# \"License\"); you may not use this file except in compliance\n",
        "# with the License. You may obtain a copy of the License at\n",
        "#\n",
        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing,\n",
        "# software distributed under the License is distributed on an\n",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
        "# KIND, either express or implied. See the License for the\n",
        "# specific language governing permissions and limitations\n",
        "# under the License"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "A8xNRyZMW1yK"
      },
      "source": [
        "# Apache Beam RunInference with TensorFlow\n",
        "\n",
        "<table align=\"left\">\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_tensorflow.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\" />Run in Google Colab</a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_tensorflow.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\" />View source on GitHub</a>\n",
        "  </td>\n",
        "</table>\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "HrCtxslBGK8Z"
      },
      "source": [
        "This notebook shows how to use the Apache Beam [RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference) transform for [TensorFlow](https://www.tensorflow.org/).\n",
        "Apache Beam has built-in support for two TensorFlow model handlers: [`TFModelHandlerNumpy`](https://github.com/apache/beam/blob/ca0787642a6b3804a742326147281c99ae8d08d2/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L91) and [`TFModelHandlerTensor`](https://github.com/apache/beam/blob/ca0787642a6b3804a742326147281c99ae8d08d2/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L184).\n",
        "\n",
        "* Use `TFModelHandlerNumpy` to run inference on models that use a `numpy` array as an input.\n",
        "* Use `TFModelHandlerTensor` to run inference on models that use a `tf.Tensor` as an input.\n",
        "\n",
        "If your model uses `tf.Example` as an input, see the [Apache Beam RunInference with `tfx-bsl`](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_tensorflow_with_tfx.ipynb) notebook.\n",
        "\n",
        "There are three ways to load a TensorFlow model:\n",
        "1. Provide a path to the saved model.\n",
        "2. Provide a path to the saved weights of the model.\n",
        "3. Provide a URL for pretrained model on TensorFlow Hub. For an example workflow, see [Apache Beam RunInference with TensorFlow and TensorFlow Hub](https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_with_tensorflow_hub.ipynb).\n",
        "\n",
        "This notebook demonstrates the following steps:\n",
        "- Build a simple TensorFlow model.\n",
        "- Set up example data.\n",
        "- Run those examples with the built-in model handlers using one of the following methods, and then get a prediction inside an Apache Beam pipeline.:\n",
        "  *   a saved model\n",
        "  *   saved weights\n",
        "\n",
        "For more information about using RunInference, see [Get started with AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the Apache Beam documentation."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "gVCtGOKTHMm4"
      },
      "source": [
        "## Before you begin\n",
        "Set up your environment and download dependencies."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "YDHPlMjZRuY0"
      },
      "source": [
        "### Install Apache Beam\n",
        "To use RunInference with the built-in Tensorflow model handler, install Apache Beam version 2.46.0 or later."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "jBakpNZnAhqk"
      },
      "outputs": [],
      "source": [
        "!pip install protobuf --quiet\n",
        "!pip install apache_beam[interactive]==2.46.0 --quiet\n",
        "\n",
        "# To use the newly installed versions, restart the runtime.\n",
        "exit()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "X80jy3FqHjK4"
      },
      "source": [
        "### Authenticate with Google Cloud\n",
        "This notebook relies on saving your model to Google Cloud. To use your Google Cloud account, authenticate this notebook."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 2,
      "metadata": {
        "id": "Kz9sccyGBqz3"
      },
      "outputs": [],
      "source": [
        "from google.colab import auth\n",
        "auth.authenticate_user()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "40qtP6zJuMXm"
      },
      "source": [
        "### Import dependencies and set up your bucket\n",
        "Use the following code to import dependencies and to set up your Google Cloud Storage bucket.\n",
        "\n",
        "Replace `PROJECT_ID` and `BUCKET_NAME` with the ID of your project and the name of your bucket."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 22,
      "metadata": {
        "id": "eEle839_Akqx"
      },
      "outputs": [],
      "source": [
        "import argparse\n",
        "from typing import Dict, Text, Any, Tuple, List\n",
        "import numpy\n",
        "\n",
        "from google.protobuf import text_format\n",
        "\n",
        "import tensorflow as tf\n",
        "from tensorflow import keras\n",
        "import apache_beam as beam\n",
        "from apache_beam.ml.inference.base import RunInference\n",
        "from apache_beam.ml.inference.base import KeyedModelHandler\n",
        "from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerNumpy\n",
        "from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor\n",
        "from apache_beam.options.pipeline_options import PipelineOptions\n",
        "\n",
        "project = \"PROJECT_ID\" # @param {type:'string'}\n",
        "bucket = \"BUCKET_NAME\" # @param {type:'string'}\n",
        "\n",
        "save_model_dir_multiply = f'gs://{bucket}/tf-inference/model/multiply_five/v1/'\n",
        "save_weights_dir_multiply = f'gs://{bucket}/tf-inference/weights/multiply_five/v1/'\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "YzvZWEv-1oiK"
      },
      "source": [
        "## Create and test a simple model\n",
        "\n",
        "This step creates and tests a model that predicts the 5 times table."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "rIwD_qEpX7Gu"
      },
      "source": [
        "### Create the model\n",
        "Create training data and build a linear regression model."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 16,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "SH7iq3zeBBJ-",
        "outputId": "5a3d3ce4-f9d8-4d87-a1bc-05afc3c9b06e"
      },
      "outputs": [
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "Model: \"model_1\"\n",
            "_________________________________________________________________\n",
            " Layer (type)                Output Shape              Param #   \n",
            "=================================================================\n",
            " x (InputLayer)              [(None, 1)]               0         \n",
            "                                                                 \n",
            " dense_1 (Dense)             (None, 1)                 2         \n",
            "                                                                 \n",
            "=================================================================\n",
            "Total params: 2\n",
            "Trainable params: 2\n",
            "Non-trainable params: 0\n",
            "_________________________________________________________________\n"
          ]
        }
      ],
      "source": [
        "# Create training data that represents the 5 times multiplication table for the numbers 0 to 99.\n",
        "# x is the data and y is the labels.\n",
        "x = numpy.arange(0, 100)   # Examples\n",
        "y = x * 5                  # Labels\n",
        "\n",
        "# Use create_model to build a simple linear regression model.\n",
        "# Note that the model has a shape of (1) for its input layer and expects a single int64 value.\n",
        "def create_model():\n",
        "  input_layer = keras.layers.Input(shape=(1), dtype=tf.float32, name='x')\n",
        "  output_layer= keras.layers.Dense(1)(input_layer)\n",
        "  model = keras.Model(input_layer, output_layer)\n",
        "  model.compile(optimizer=tf.optimizers.Adam(), loss='mean_absolute_error')\n",
        "  return model\n",
        "\n",
        "model = create_model()\n",
        "model.summary()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "O_a0-4Gb19cy"
      },
      "source": [
        "### Test the model\n",
        "\n",
        "This step tests the model that you created."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 17,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "5XkIYXhJBFmS",
        "outputId": "ad2ff8a9-522c-41f4-e5d8-dbdcb53b0ded"
      },
      "outputs": [
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "1/1 [==============================] - 0s 38ms/step\n",
            "Test Examples [20, 40, 60, 90]\n",
            "Predictions [[21.896107]\n",
            " [41.795692]\n",
            " [61.69528 ]\n",
            " [91.544655]]\n"
          ]
        }
      ],
      "source": [
        "model.fit(x, y, epochs=500, verbose=0)\n",
        "test_examples =[20, 40, 60, 90]\n",
        "value_to_predict = numpy.array(test_examples, dtype=numpy.float32)\n",
        "predictions = model.predict(value_to_predict)\n",
        "\n",
        "print('Test Examples ' + str(test_examples))\n",
        "print('Predictions ' + str(predictions))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Y3BC2aY8cIMI"
      },
      "source": [
        "### Save the model\n",
        "\n",
        "This step shows how to save your model."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 18,
      "metadata": {
        "id": "2JbE7WkGcAkK"
      },
      "outputs": [],
      "source": [
        "model.save(save_model_dir_multiply)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "g_qVtXPeUcMS"
      },
      "source": [
        "Instead of saving the entire model, you can [save the model weights for inference](https://www.tensorflow.org/guide/keras/save_and_serialize#saving_loading_only_the_models_weights_values). You can use this method when you need the model for inference but don't need any compilation information or optimizer state. In addition, when using transfer learning applications, you can use this method to load the weights with new models.\n",
        "\n",
        "With this approach, you need to pass the function to build the TensorFlow model to the `TFModelHandler` class that you're using, either`TFModelHandlerNumpy` or `TFModelHandlerTensor`. You also need to pass `model_type=ModelType.SAVED_WEIGHTS` to the class.\n",
        "\n",
        "\n",
        "\n",
        "```\n",
        "model_handler = TFModelHandlerNumpy(path_to_weights, model_type=ModelType.SAVED_WEIGHTS, create_model_fn=build_tensorflow_model)\n",
        "```\n",
        "\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 19,
      "metadata": {
        "id": "Kl1C_NwaUbiv"
      },
      "outputs": [],
      "source": [
        "model.save_weights(save_weights_dir_multiply)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "0a1zerXycQ0z"
      },
      "source": [
        "## Run the pipeline\n",
        "Use the following code to run the pipeline by specifying path to the trained TensorFlow model."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 20,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/",
          "height": 124
        },
        "id": "St07XoibcQSb",
        "outputId": "d36f77f2-d07e-4868-f4cb-d120ab54e653"
      },
      "outputs": [
        {
          "name": "stderr",
          "output_type": "stream",
          "text": [
            "WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.\n"
          ]
        },
        {
          "data": {
            "application/javascript": "\n        if (typeof window.interactive_beam_jquery == 'undefined') {\n          var jqueryScript = document.createElement('script');\n          jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n          jqueryScript.type = 'text/javascript';\n          jqueryScript.onload = function() {\n            var datatableScript = document.createElement('script');\n            datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n            datatableScript.type = 'text/javascript';\n            datatableScript.onload = function() {\n              window.interactive_beam_jquery = jQuery.noConflict(true);\n              window.interactive_beam_jquery(document).ready(function($){\n                \n              });\n            }\n            document.head.appendChild(datatableScript);\n          };\n          document.head.appendChild(jqueryScript);\n        } else {\n          window.interactive_beam_jquery(document).ready(function($){\n            \n          });\n        }"
          },
          "metadata": {},
          "output_type": "display_data"
        },
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "example is 20.0 prediction is [21.896107]\n",
            "example is 40.0 prediction is [41.795692]\n",
            "example is 60.0 prediction is [61.69528]\n",
            "example is 90.0 prediction is [91.544655]\n"
          ]
        }
      ],
      "source": [
        "class FormatOutput(beam.DoFn):\n",
        "  def process(self, element, *args, **kwargs):\n",
        "     yield \"example is {example} prediction is {prediction}\".format(example=element.example, prediction=element.inference)\n",
        "\n",
        "\n",
        "examples = numpy.array([20, 40, 60, 90], dtype=numpy.float32)\n",
        "model_handler = TFModelHandlerNumpy(save_model_dir_multiply)\n",
        "with beam.Pipeline() as p:\n",
        "    _ = (p | beam.Create(examples)\n",
        "           | RunInference(model_handler)\n",
        "           | beam.ParDo(FormatOutput())\n",
        "           | beam.Map(print)\n",
        "        )"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "0lbPamYGV8E6"
      },
      "source": [
        "Use the following code to run the pipeline with the saved weights of a TensorFlow model.\n",
        "\n",
        "To load the model with saved weights, the `TFModelHandlerNumpy` class requires a `create_model` function that builds and returns a TensorFlow model that is compatible with the saved weights."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 21,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "QQam0O4cWG42",
        "outputId": "d2ab8603-cdc7-4cd6-b909-e6edfeaa5422"
      },
      "outputs": [
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "example is 20.0 prediction is [21.896107]\n",
            "example is 40.0 prediction is [41.795692]\n",
            "example is 60.0 prediction is [61.69528]\n",
            "example is 90.0 prediction is [91.544655]\n"
          ]
        }
      ],
      "source": [
        "from apache_beam.ml.inference.tensorflow_inference import ModelType\n",
        "examples = numpy.array([20, 40, 60, 90], dtype=numpy.float32)\n",
        "model_handler = TFModelHandlerNumpy(save_weights_dir_multiply, model_type=ModelType.SAVED_WEIGHTS, create_model_fn=create_model)\n",
        "with beam.Pipeline() as p:\n",
        "    _ = (p | beam.Create(examples)\n",
        "           | RunInference(model_handler)\n",
        "           | beam.ParDo(FormatOutput())\n",
        "           | beam.Map(print)\n",
        "        )"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "tRLArcjOcYuO"
      },
      "source": [
        "## Use a keyed model handler\n",
        "To use a keyed model handler, use `KeyedModelHandler` with TensorFlow by using `TFModelHandlerNumpy`.\n",
        "\n",
        "By default, the `ModelHandler` does not expect a key.\n",
        "\n",
        "* If you know that keys are associated with your examples, use `beam.KeyedModelHandler` to wrap the model handler.\n",
        "* If you don't know whether keys are associated with your examples, use `beam.MaybeKeyedModelHandler`."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "P6l9RwL2cAW3",
        "outputId": "03459fea-7d0a-4501-93cb-18bbad915d13"
      },
      "outputs": [
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "1.0 : ['example is 20.0 prediction is [51.815357]']\n",
            "2.0 : ['example is 40.0 prediction is [101.63492]']\n",
            "3.0 : ['example is 60.0 prediction is [151.45448]']\n",
            "4.0 : ['example is 90.0 prediction is [226.18384]']\n"
          ]
        }
      ],
      "source": [
        "class FormatOutputKeyed(FormatOutput):\n",
        "  # To simplify, inherit from FormatOutput.\n",
        "  def process(self, tuple_in: Tuple):\n",
        "    key, element = tuple_in\n",
        "    output = super().process(element)\n",
        "    yield \"{} : {}\".format(key, [op for op in output])\n",
        "\n",
        "examples = numpy.array([(1,20), (2,40), (3,60), (4,90)], dtype=numpy.float32)\n",
        "keyed_model_handler = KeyedModelHandler(TFModelHandlerNumpy(save_model_dir_multiply))\n",
        "with beam.Pipeline() as p:\n",
        "    _ = (p | 'CreateExamples' >> beam.Create(examples)\n",
        "           | RunInference(keyed_model_handler)\n",
        "           | beam.ParDo(FormatOutputKeyed())\n",
        "           | beam.Map(print)\n",
        "        )"
      ]
    }
  ],
  "metadata": {
    "accelerator": "GPU",
    "colab": {
      "provenance": []
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
